package cn.doitedu.rtmk.demo.demo4;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
public class Demo4 {

    public static void main(String[] args) throws Exception {
        // 构建编程环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(10000);
        env.setParallelism(1);
        env.getCheckpointConfig().setCheckpointStorage("file:/d:/ckpt");

        // 读 kafka中的日志明细数据,接收用户的行为
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("doitedu:9092")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setGroupId("gpac01" + System.currentTimeMillis())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setTopics("rtmk-test-data")
                .build();
        DataStreamSource<String> ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "s");
        SingleOutputStreamOperator<EventBean> beans = ds.map(json -> JSON.parseObject(json, EventBean.class));


        // 读取kafka中的规则参数
        KafkaSource<String> paramDataSource = KafkaSource.<String>builder()
                .setBootstrapServers("doitedu:9092")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setGroupId("gpac01" + System.currentTimeMillis())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setTopics("rtmk-rule-param")
                .build();
        DataStreamSource<String> paramData = env.fromSource(paramDataSource, WatermarkStrategy.noWatermarks(), "s");

        // 广播状态的结构==>  key:String规则id ，value:RuleCalculator 规则运算机对象
        MapStateDescriptor<String, String> bcStateDesc = new MapStateDescriptor<>("param_bc_state", String.class, String.class);
        BroadcastStream<String> paramBc = paramData.broadcast(bcStateDesc);

        SingleOutputStreamOperator<RtmkMessage> res =
                beans.keyBy(EventBean::getUserId)
                        .connect(paramBc)
                        .process(new KeyedBroadcastProcessFunction<Integer, EventBean, String, RtmkMessage>() {

                            /**
                             *  raw-state , 不会被flink管理，也不会被快照checkpoint
                             *  这个map，要被 processBroadcastElement和  processElement ，同时使用，
                             *  为了防止  processElement在使用一个规则运算机的过程中，突然被  processBroadcastElement 更新或替换，
                             *  所以，使用并发安全的ConcurrentHashMap
                             */
                            final Map<String,RuleModelCalculator> calculatorMap = new ConcurrentHashMap<>();

                            @Override
                            public void open(Configuration parameters) throws Exception {


                            }

                            @Override
                            public void processElement(EventBean eventBean, KeyedBroadcastProcessFunction<Integer, EventBean, String, RtmkMessage>.ReadOnlyContext ctx, Collector<RtmkMessage> out) throws Exception {
                                /**
                                 *  用于故障后恢复全局状态机列表
                                 */
                                if (calculatorMap.size() == 0) {
                                    ReadOnlyBroadcastState<String, String> broadcastState = ctx.getBroadcastState(bcStateDesc);
                                    ParamJsonUtil.restoreCalculators(broadcastState,calculatorMap,getRuntimeContext());
                                }

                                /**
                                 * 用于测试异常后的容错能力
                                 */
                                if(RandomUtils.nextInt(1,20)%8 ==0 ){
                                    log.error("发生异常了......");
                                    throw new RuntimeException("哈哈哈哈哈哈");
                                }


                                /**
                                 * 遍历每一个规则来调用运算
                                 */
                                for (Map.Entry<String,RuleModelCalculator> entry: calculatorMap.entrySet()) {
                                    log.warn("正准备调用规则,规则ID为: {}", entry.getKey());
                                    entry.getValue().calc(eventBean, out);
                                }
                            }




                            @Override
                            public void processBroadcastElement(String paramJson, KeyedBroadcastProcessFunction<Integer, EventBean, String, RtmkMessage>.Context ctx, Collector<RtmkMessage> out) throws Exception {

                                JSONObject jsb = JSON.parseObject(paramJson);
                                log.warn("接收到规则参数,所属模型:{},规则ID:{}",jsb.getString("ruleModelId"),jsb.getString("ruleId"));

                                BroadcastState<String, String> broadcastState = ctx.getBroadcastState(bcStateDesc);

                                /**
                                 * 1. 如果 calculatorMap 为空
                                 * 则有可能是系统第一次启动，也有可能是故障后的重启
                                 * 需要做一次恢复运算机的动作
                                 */
                                if(calculatorMap.size() == 0 ){
                                    ParamJsonUtil.restoreCalculators(broadcastState,calculatorMap,getRuntimeContext());
                                }

                                /**
                                 * 2. 然后，为本次接收到的规则json，进行处理
                                 */
                                // 接收到参数json，马上存入广播状态
                                JSONObject jsonObject = JSON.parseObject(paramJson);
                                String ruleId = jsonObject.getString("ruleId");
                                broadcastState.put(ruleId, paramJson);


                                // 根据传入的参数json，构造运算机
                                RuntimeContext runtimeContext = getRuntimeContext();
                                RuleModelCalculator calculator = ParamJsonUtil.generateCalculator(paramJson, runtimeContext);

                                // 将运算机加入全局列表
                                if (calculator != null) {
                                    calculatorMap.put(ruleId,calculator);
                                }

                            }

                        });


        res.print();

        env.execute();


    }


}
